home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / python-support / python-rdflib / rdflib / store / Concurrent.py < prev    next >
Encoding:
Python Source  |  2007-04-04  |  2.6 KB  |  94 lines

  1. from __future__ import generators
  2.  
  3. from threading import Lock
  4.  
  5. class ResponsibleGenerator(object):
  6.     """A generator that will help clean up when it is done being used."""
  7.  
  8.     __slots__ = ['cleanup', 'gen']
  9.  
  10.     def __init__(self, gen, cleanup):
  11.         self.cleanup = cleanup
  12.         self.gen = gen
  13.  
  14.     def __del__(self):
  15.         self.cleanup()
  16.  
  17.     def __iter__(self):
  18.         return self
  19.  
  20.     def next(self):
  21.         return self.gen.next()
  22.  
  23.  
  24. class Concurrent(object):
  25.  
  26.     def __init__(self, store):
  27.         self.store = store
  28.  
  29.         # number of calls to visit still in progress
  30.         self.__visit_count = 0
  31.  
  32.         # lock for locking down the indices
  33.         self.__lock = Lock()
  34.  
  35.         # lists for keeping track of added and removed triples while
  36.         # we wait for the lock
  37.         self.__pending_removes = []
  38.         self.__pending_adds = []
  39.  
  40.     def add(self, (s, p, o)):
  41.         if self.__visit_count==0:
  42.             self.store.add((s, p, o))
  43.         else:
  44.             self.__pending_adds.append((s, p, o))
  45.  
  46.     def remove(self, (subject, predicate, object)):
  47.         if self.__visit_count==0:
  48.             self.store.remove((subject, predicate, object))
  49.         else:
  50.             self.__pending_removes.append((subject, predicate, object))
  51.  
  52.     def triples(self, (subject, predicate, object)):
  53.         g = self.store.triples((subject, predicate, object))
  54.         pending_removes = self.__pending_removes
  55.         self.__begin_read()
  56.         for s, p, o in ResponsibleGenerator(g, self.__end_read):
  57.             if not (s, p, o) in pending_removes:
  58.                 yield s, p, o
  59.  
  60.         for (s, p, o) in self.__pending_adds:
  61.             if (subject==None or subject==s) and (predicate==None or predicate==p) and (object==None or object==o):
  62.                 yield s, p, o
  63.  
  64.     def __len__(self):
  65.         return self.store.__len__()
  66.  
  67.     def __begin_read(self):
  68.         lock = self.__lock
  69.         lock.acquire()
  70.         self.__visit_count = self.__visit_count + 1
  71.         lock.release()
  72.  
  73.     def __end_read(self):
  74.         lock = self.__lock
  75.         lock.acquire()
  76.         self.__visit_count = self.__visit_count - 1
  77.         if self.__visit_count==0:
  78.             pending_removes = self.__pending_removes
  79.             while pending_removes:
  80.                 (s, p, o) = pending_removes.pop()
  81.                 try:
  82.                     self.store.remove((s, p, o))
  83.                 except:
  84.                     # TODO: change to try finally?
  85.                     print s, p, o, "Not in store to remove"
  86.             pending_adds = self.__pending_adds
  87.             while pending_adds:
  88.                 (s, p, o) = pending_adds.pop()
  89.                 self.store.add((s, p, o))
  90.         lock.release()
  91.  
  92.  
  93.  
  94.